Spectrometers

Author

Yiwei Mao

Published

November 9, 2023

Ocean Insight Spectrometers

Ocean Insight (formerly Ocean Optics) have a user interface to operate their spectrometers called OceanView. The install files can be found at https://www.oceaninsight.com/support/software-downloads/oceanview-2.0.10-downloads-nolm/. The GUI allows you to inspect the spectra, change settings, and save timestamped data. However, after using it, I found it inconvenient for data analysis because the data files are not immediately accessible. The first data row has one more element than the following rows making it annoying to read into a DataFrame without manual indexing. A few settings are multiple clicks away and it is also not easy to zoom in a specific wavelength range of interest and expand the y-axis scale.

There is a Python API to operate their spectrometers which can be easily installed using

pip install seabreeze

Then there are further install instructions for Linux and Windows (none for MacOS). Follow the install guide here https://python-seabreeze.readthedocs.io/en/latest/install.html. Python packages can be used in Julia with PyCall.jl.

This lets you connect to spectrometers, change the exposure and trigger mode, and view the wavelengths and raw counts as a numpy arrays.

Code
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
Code
import xarray as xr
import holoviews as hv
hv.extension("bokeh", logo=False)
import matplotlib.pyplot as plt

from datetime import datetime, timezone
from scipy.interpolate import interp1d
Code
from nbdev.showdoc import show_doc
Code
from fastcore.foundation import patch
from fastcore.dispatch import typedispatch # only types of first two arguments used
from tqdm import tqdm

from holoviews import opts
from holoviews.streams import Pipe, Buffer
Code

class DateTimeBuffer():
    """Records timestamps in UTC time."""
    def __init__(self, n:int = 16) -> None:
        """Initialise a nx1 array and write index"""
        self.data = np.arange(n).astype(datetime)
        self.n = n
        self.write_pos = 0
        
    def __getitem__(self, key:slice) -> datetime:
        return self.data[key]

    def update(self) -> None:
        """Stores current UTC time in an internal buffer when this method is called."""
        ts = datetime.timestamp(datetime.now())
        self.data[self.write_pos] = datetime.fromtimestamp(ts, tz=timezone.utc)
        self.write_pos += 1

        # Loop back if buffer is full
        if self.write_pos == self.n:
            self.write_pos = 0

show_doc(DateTimeBuffer)

DateTimeBuffer

 DateTimeBuffer (n:int=16)

Records timestamps in UTC time.

Code
show_doc(DateTimeBuffer.update)

DateTimeBuffer.update

 DateTimeBuffer.update ()

Stores current UTC time in an internal buffer when this method is called.

Code
from typing import Iterable, Union, Callable, List, TypeVar, Generic, Tuple, Optional

class SpectraBuffer(object):
    """Circular FIFO buffer for spectral measurements"""
    def __init__(self, 
                 nbands:int=8, # number of spectral pixels
                 nlines:int=4, # length of buffer
                 dtype:type = np.uint16 # data type. raw counts use unit16 but radiance use float32
                ) -> None:
        """Preallocate data array"""
        self.size = (nlines,nbands)
        self.data = np.zeros(self.size,dtype=dtype)
        self.write_pos = 0
        self.read_pos = 0
        self.slots_left = nlines
        self.time_buff = DateTimeBuffer(n=nlines)
        
    def __getitem__(self, key:slice):
        return self.data[key]
    
    def _inc(self, idx:int) -> int:
        """Increment read/write index with wrap around"""
        idx += 1
        if idx == self.size[0]: idx = 0
        return idx
    
    def is_empty(self) -> bool:
        return self.slots_left == self.size[0]

    def put(self, line:np.ndarray) -> None:
        """Place spectra into the buffer"""
        self.data[self.write_pos,:] = line
        self.time_buff.update()
        
        # if buffer full, update read position to keep track of oldest slot
        self.slots_left -= 1
        if self.slots_left < 0:
            self.slots_left = 0
            self.read_pos = self._inc(self.read_pos)
        
        self.write_pos = self._inc(self.write_pos)
            
    def get(self) -> np.ndarray:
        """Reads the oldest (n-1)darray from the buffer"""
        if self.slots_left < self.size[0]:
            val = self.data[self.read_pos,:]
            self.slots_left += 1
            self.read_pos = self._inc(self.read_pos)
            return val
        else:
            return None
        
show_doc(SpectraBuffer)

SpectraBuffer

 SpectraBuffer (nbands:int=8, nlines:int=4, dtype:type=<class
                'numpy.uint16'>)

Circular FIFO buffer for spectral measurements

Type Default Details
nbands int 8 number of spectral pixels
nlines int 4 length of buffer
dtype type uint16 data type. raw counts use unit16 but radiance use float32
Returns None
Code
show_doc(SpectraBuffer.put)

SpectraBuffer.put

 SpectraBuffer.put (line:numpy.ndarray)

Place spectra into the buffer

Code
show_doc(SpectraBuffer.get)

SpectraBuffer.get

 SpectraBuffer.get ()

Reads the oldest (n-1)darray from the buffer

Code

class OceanSpectro(SpectraBuffer):
    
    def __init__(self, 
                 exposure:int=40, # exposure time in ms
                 nlines:int=128,  # length of buffer to preallocate
                 serial_num:str="FLMS01766", # spectrometer name "FLMS01766" or "QEP00994"
                 trigger:bool=False,         # trigger on rising edge or not (normal)
                 radiometric:bool=False,     # radiance stored using floating point or raw counts using integers
                ) -> None:
        """
        Initialise the Ocean Insight spectrometers. 
        """
        from seabreeze.spectrometers import Spectrometer
        try: 
            self.spec = Spectrometer.from_serial_number(serial_num) if serial_num else Spectrometer.from_first_available()
            self.spec.integration_time_micros(exposure*1000)
            self._exposure = exposure
            self.trigger(trigger)

            self.wavelengths = self.spec.wavelengths()
            self.get_spectra()
        except: 
            print("Device already opened. Close with `self.close()` then try again.")
            
        self.n = nlines
        super().__init__(nbands=len(self.wavelengths),nlines=nlines,dtype=np.float32 if radiometric else np.uint16)
        
        
    @property
    def exposure(cls):
        return cls._exposure
    @exposure.setter
    def exposure(cls,val):
        cls._exposure = val
        cls.spec.integration_time_micros(val*1000)

    def get_spectra(self) -> np.array:
        """Grab spectral measurment"""
        self.start()
        self.last_spectra = self.spec.intensities()
        self.close()
        return self.last_spectra
    
    def start(self):
        """open spectrometer if closed"""
        try: self.spec.open()
        except: print("Please initialise the spectrometer first")
        
    def close(self):
        """close connection to spectrometer"""
        self.spec.close()
        
    def __repr__(self) -> str:
        return f"Spectrometer {self.spec.serial_number}. Exposure time = {self.exposure} ms. Wavelength range = {self.wavelengths[0]:.2f} to {self.wavelengths[-1]:.2f} nm."
    
    def trigger(self, 
                val:bool=False, # trigger mode
               ) -> None:
        """normal mode or hardware rising edge trigger mode"""
        # see https://www.oceaninsight.com/globalassets/catalog-blocks-and-images/manuals--instruction-ocean-optics/electronic-accessories/external-triggering-options_firmware3.0andabove.pdf
        if val:
            self.spec.trigger_mode(3)
        else:
            self.spec.trigger_mode(0)
            
    def collect(self) -> None:
        """Fill up the spectral buffer"""
        self.start()
        for i in tqdm(range(self.n)):
            self.put( self.spec.intensities() )
        self.close()
        
    def average(self,n:int) -> np.array:
        """Measure `n` spectra and return the average."""
        temp = np.zeros((n,len(self.wavelengths)),dtype=np.float32)
        self.start()
        for i in tqdm(range(n)):
            temp[i,:] = self.spec.intensities()
        self.close()
        return np.mean(temp,axis=0)
        
    def show(self,
             plot_lib="bokeh", # plotting backend 'bokeh' or 'matplotlib'
             savedir:str=None, # save directory string
            ) -> hv.Curve:     # plot object
        """Plot spectra with option to save plot"""
        hv.extension(plot_lib, logo=False)
        curve = hv.Curve( (self.wavelengths,self.last_spectra) ).opts(xlabel="wavelength (nm)",ylabel="counts")
        
        if plot_lib == "bokeh":
            curve = curve.opts(width=1000,height=250)
        else: # plot_lib == "matplotlib"
            curve = curve.opts(fig_inches=12,aspect=3)
        if savedir:
            curve.savefig(f"{savedir}/{self.time_buff.data[0].strftime('%Y_%m_%d')}/{self.spec.serial_number}_{datetime.now().strftime('%Y_%m_%d-%H_%M_%S')}.pdf",bbox_inches='tight', pad_inches=0)
        return curve
    
    def waterfall(self,
                  plot_lib:str="bokeh", # plotting backend 'bokeh' or 'matplotlib'
                  wavelen_range:tuple=None, # wavelength nm range as tuple (start_nm, end_nm)
                  savedir:str=None, # save directory string. only possible with plotlib='matplotlib'
                ) -> hv.Image:      # plot object
        """Plot spectral timeseries"""
        hv.extension(plot_lib, logo=False)
        if not wavelen_range:
            start_idx = 0
            end_idx   = len(self.wavelengths)
        else:
            start_idx = np.sum(self.wavelengths<wavelen_range[0])
            end_idx   = np.sum(self.wavelengths<wavelen_range[1])
        img = hv.Image( self.data[:,start_idx:end_idx] ).opts(colorbar=True,cmap="Viridis")
        
        if plot_lib == "bokeh":
            img = img.opts(width=500,height=500)
        else: # plot_lib == "matplotlib"
            img = img.opts(fig_inches=12,aspect=1.)
        if savedir:
            fig, ax = plt.subplots(figsize=(10,10))
            ax.imshow(self.data[:,start_idx:end_idx]); ax.set_xlabel("wavelength index"); ax.set_ylabel("time index")
            fig.savefig(f"{savedir}/{self.spec.serial_number}_{self.time_buff.data[0].strftime('%Y_%m_%d-%H_%M_%S')}.png",bbox_inches='tight', pad_inches=0)
        return img
    
    def get_live_waterfall(self) -> hv.DynamicMap:
        """Produce a dynamic live image that is updated when `self.runtime` is called."""
        def rescale(x, in_min, in_max, out_min, out_max):
            return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
        self.spec_stream = Pipe([])
        self.img_dmap = hv.DynamicMap(hv.Image,streams=[self.spec_stream],).opts(xlabel="wavelength (nm)",ylabel="seconds ago")
        self.img_dmap.opts(xlim=(-0.5,0.5),ylim=(-0.5,0.5),width=500,height=500,cmap="plasma",colorbar=True,toolbar='above',
             xticks=[(i,int(rescale(i,-0.5,0.5,oo.wavelengths[0],oo.wavelengths[-1]))) for i in np.arange(-0.5,0.51,0.08)],
             yticks=[(i,np.round(rescale(i,-0.5,0.5,0,oo.n*oo.exposure/1000),1)) for i in np.arange(-0.5,0.51,0.1)],yformatter='%.1f s',
             title=f"Spectrometer {oo.spec.serial_number}. Exposure time = {oo.exposure} ms.")
        return self.img_dmap
    
    def runtime(self,
                seconds:float=10, # seconds to run the spectrometer
               ) -> None:
        """Run the spectrometer for `seconds` long and fill up the spectral buffer for `self.img_dmap`."""
        self.start()
        for i in tqdm(range(int(seconds*1000/self.exposure))):
            self.put( self.spec.intensities() )
            temp = self.data.copy()
            temp[:self.n-self.read_pos,:] = self.data[self.read_pos:,:]
            temp[self.n-self.read_pos:,:] = self.data[:self.read_pos,:]
            self.spec_stream.send(temp)
        self.close()
    

    def get_AC_diff(self:OceanSpectro) -> None:
        self.diff_arr = np.abs( np.float32(self.data[1:,:])-np.float32(self.data[:-1,:]) )
        return self.diff_arr

    
show_doc(OceanSpectro)

OceanSpectro

 OceanSpectro (exposure:int=40, nlines:int=128,
               serial_num:str='FLMS01766', radiometric:bool=False)

Circular FIFO buffer for spectral measurements

Type Default Details
exposure int 40 exposure time in ms
nlines int 128 length of buffer to preallocate
serial_num str FLMS01766 spectrometer name “FLMS01766” or “QEP00994”
radiometric bool False radiance stored using floating point or raw counts using integers
Returns None
Code
show_doc(OceanSpectro.get_spectra)

OceanSpectro.get_spectra

 OceanSpectro.get_spectra ()

Grab spectral measurment

Code
show_doc(OceanSpectro.start)

OceanSpectro.start

 OceanSpectro.start ()

open spectrometer if closed

Code
show_doc(OceanSpectro.close)

OceanSpectro.close

 OceanSpectro.close ()

close connection to spectrometer

Code
show_doc(OceanSpectro.trigger)

OceanSpectro.trigger

 OceanSpectro.trigger (val:bool=False)

guessing these values

Type Default Details
val bool False trigger mode
Returns None
Code
show_doc(OceanSpectro.collect)

OceanSpectro.collect

 OceanSpectro.collect ()

Fill up the spectral buffer

Code
show_doc(OceanSpectro.average)

OceanSpectro.average

 OceanSpectro.average (n:int)

Measure n spectra and return the average.

Code
show_doc(OceanSpectro.show)

OceanSpectro.show

 OceanSpectro.show (plot_lib='bokeh', savedir:str=None)

Plot spectra with option to save plot

Type Default Details
plot_lib str bokeh plotting backend ‘bokeh’ or ‘matplotlib’
savedir str None save directory string
Returns Curve plot object
Code
show_doc(OceanSpectro.waterfall)

OceanSpectro.waterfall

 OceanSpectro.waterfall (plot_lib:str='bokeh', wavelen_range:tuple=None,
                         savedir:str=None)

Plot spectral timeseries

Type Default Details
plot_lib str bokeh plotting backend ‘bokeh’ or ‘matplotlib’
wavelen_range tuple None wavelength nm range as tuple (start_nm, end_nm)
savedir str None save directory string. only possible with plotlib=‘matplotlib’
Returns Image plot object
Code
show_doc(OceanSpectro.get_live_waterfall)

OceanSpectro.get_live_waterfall

 OceanSpectro.get_live_waterfall ()

Produce a dynamic live image that is updated when self.runtime is called.

Code
show_doc(OceanSpectro.runtime)

OceanSpectro.runtime

 OceanSpectro.runtime (seconds:float=10)

Run the spectrometer for seconds long and fill up the spectral buffer for self.img_dmap.

Type Default Details
seconds float 10 seconds to run the spectrometer
Returns None

Using the spectrometer

Code
oo = OceanSpectro(nlines=128)
oo.get_spectra()
oo.show()

You can collect all nlines at once and plot it using OceanSpectro.waterfall

Code
oo.collect()
oo.waterfall()
100%|█████████████████████████████████████████| 128/128 [00:05<00:00, 24.98it/s]

Then you can save it as a HDF5 file.

Code

@patch
def save(self:OceanSpectro,
        savedir:str = ".") -> None:
    """save spectral buffer into HDF5 format"""
    attrs = {"exposure":self.exposure,"spectrometer":self.spec.serial_number}
    #self.directory = self.directory = f"{savedir}/{self.time_buff.data[0].strftime('%Y_%m_%d')}"
    
    self.coords = dict(wavelength=(["wavelength"],self.wavelengths),
                       time=(["time"],self.time_buff.data.astype(np.datetime64)) )
    self.nc = xr.Dataset(data_vars=dict(datacube=(["time","wavelength"],self.data)),
                             coords=self.coords, 
                             attrs=attrs)
    self.nc.time.attrs["long_name"]   = "UTC time"
    self.nc.time.attrs["description"] = "UTC time for each measurement"
    self.nc.wavelength.attrs["long_name"]   = "wavelength_nm"
    self.nc.wavelength.attrs["units"]       = "nanometers"
    self.nc.wavelength.attrs["description"] = "wavelength in nanometers."
    
    self.nc.to_netcdf(f"{savedir}/{self.spec.serial_number}_{self.time_buff.data[0].strftime('%Y_%m_%d-%H_%M_%S')}_spectra.h5")
    self.waterfall(plot_lib="matplotlib",savedir=savedir)

show_doc(OceanSpectro.save)

OceanSpectro.save

 OceanSpectro.save (savedir:str='.')

save spectral buffer into HDF5 format

Code
oo.save("../../data/test/")
/var/folders/fn/jjhdyhvd3_98hz2qmws897lw0000gn/T/ipykernel_13769/1695057546.py:9: DeprecationWarning: parsing timezone aware datetimes is deprecated; this will raise an error in the future
  time=(["time"],self.time_buff.data.astype(np.datetime64)) )

And load the save file back up.

Code

@patch
def load(self:OceanSpectro,
        fname:str, # path to h5 or netcdf4 file
        ) -> None:
    ds = xr.open_dataset(fname)
    self._exposure = ds.attrs["exposure"]
    self.wavelengths = ds.coords["wavelength"]
    self.n = ds.datacube.shape[0]
    self.size = ds.datacube.shape
    self.data = ds.datacube.data
    self.write_pos = ds.datacube.shape[1]-1
    self.read_pos  = 0
    self.slots_left = self.n
    self.time_buff.data = ds.coords["time"]

show_doc(OceanSpectro.load)

OceanSpectro.load

 OceanSpectro.load (fname:str)
Type Details
fname str path to h5 or netcdf4 file
Returns None
Code
oo2 = OceanSpectro()
oo2.load("../../data/test/FLMS01766_2023_11_11-02_29_16_spectra.h5")
oo2.waterfall()

You can also view an updating waterfall plot by running the following two code cells together (in order).

Code
oo.get_live_waterfall()
Code
oo.runtime(10) # seconds
100%|█████████████████████████████████████████| 250/250 [00:14<00:00, 17.73it/s]

Measuring Dark Current

Code
@patch
def measure_dark(self:OceanSpectro,
                fname:str=None) -> None:
    x = self.average(100)
    self.dark_df = pd.DataFrame(x,columns=["dark"])
    if savedir:
        self.dark_df.to_csv(fname)
    return self.dark_df

show_doc(OceanSpectro.measure_dark)
dark
0 16.990566
1 29068.427734
2 2367.610107
3 2321.225586
4 2317.972900
... ...
2043 2747.008545
2044 2742.784912
2045 2741.450684
2046 2741.450684
2047 2741.450684

2048 rows × 1 columns

Code
@patch
def load_dark(self:OceanSpectro,
              fname:str) -> None:
    self.dark_df = pd.read_csv(fname)

show_doc(OceanSpectro.load_dark)

OceanSpectro.load_dark

 OceanSpectro.load_dark (fname:str)
Code
@patch
def measure_sphere(self:OceanSpectro,
                   fname:str=None) -> None:
    x = self.average(100)
    self.sphere_df = pd.DataFrame(x,columns=["sphere"])
    if savedir:
        self.sphere_df.to_csv(fname)
    return self.sphere_df

show_doc(OceanSpectro.measure_sphere)

OceanSpectro.measure_sphere

 OceanSpectro.measure_sphere (fname:str=None)
Code
@patch
def load_sphere(self:OceanSpectro,
                fname:str) -> None:
    self.sphere_df = pd.read_csv(fname)
    
show_doc(OceanSpectro.load_sphere)

OceanSpectro.load_sphere

 OceanSpectro.load_sphere (fname:str)
Code
@patch
def correct_dark(self:OceanSpectro,
                ) -> None:
    
    pass

show_doc(OceanSpectro.correct_dark)
Code
@patch
def correct_sphere(self:OceanSpectro,
                ) -> None:
    pass

show_doc(OceanSpectro.correct_sphere)
Code
@patch
def dn2rad(self:OceanSpectro,
                ) -> None:
    # temp buff record
    # x = self.correct_sphere( self.correct_dark() )
    pass

show_doc(OceanSpectro.dn2rad)

QEpro

Work in progress

To be continued…

Flame

Code
oo = OceanSpectro(nlines=128,serial_num="QEP00994")
oo.spec.trigger_mode(3)
oo.get_spectra()
oo.show()
SeaBreezeError: Error: Invalid trigger mode
Code
oo.spec.trigger_mode(3)
oo.get_spectra()
oo.show()
SeaBreezeError: Error: Invalid trigger mode
Code
#oo.spec.trigger_mode(3)
oo.get_spectra()
oo.show()
Code
oo.close()
Code

#import seabreeze
#seabreeze.use('cseabreeze')

#seabreeze.use('pyseabreeze')

from seabreeze.spectrometers import Spectrometer

spec = Spectrometer.from_first_available()
spec.integration_time_micros(40_000)
spec.wavelengths()

spec.trigger_mode(3)
Code
spec.intensities()
array([1633., 1616., 1617., ..., 1616., 1615., 1614.])
Code
import matplotlib.pyplot as plt
plt.plot(spec.wavelengths(), spec.intensities())
SeaBreezeError: Error: Data transfer error
Code
spec.trigger_mode(3)
plt.plot(spec.wavelengths(), spec.intensities())

Code
spec.model
'USB2000PLUS'
Code
plt.plot(spec.spectrum()[0],spec.spectrum()[1])

Code
spec.intensities()
array([1.69905556e+01, 2.90684133e+04, 2.36896889e+03, ...,
       2.72091611e+03, 2.72091611e+03, 2.72091611e+03])
Code
spec.close()
Code
spec.open()
SeaBreezeError: Error: No device found
Code
import numpy as np
x = np.arange(400,700)
y = np.sin(x/20)

hv.Curve( (x,y) )
Code
2**16
65536
Code

def find_AC_peak_heights(data:np.ndarray) -> np.array:
    """"""
    temp = data[1:,:] - data[:-1,:]
    pass